package nbcp.rpc


import org.slf4j.LoggerFactory
import org.smartboot.socket.MessageProcessor
import org.smartboot.socket.StateMachineEnum
import org.smartboot.socket.transport.AioSession

import java.io.*
import java.lang.reflect.Proxy
import java.net.SocketTimeoutException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit

/**
 * @author 三刀
 * @version V1.0 , 2018/7/1
 */
class RpcConsumerProcessor : MessageProcessor<ByteArray> {
    private val synchRespMap = ConcurrentHashMap<String, CompletableFuture<RpcResponse>>()
    private val objectMap = ConcurrentHashMap<Class<*>, Any>()
    private var aioSession: AioSession<ByteArray>? = null

    override fun process(session: AioSession<ByteArray>, msg: ByteArray) {
        var objectInput: ObjectInput? = null
        try {
            objectInput = ObjectInputStream(ByteArrayInputStream(msg))
            val resp = objectInput.readObject() as RpcResponse
            synchRespMap[resp.uuid]!!.complete(resp)
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            if (objectInput != null) {
                try {
                    objectInput.close()
                } catch (e: IOException) {
                    e.printStackTrace()
                }

            }
        }

    }

    fun <T> getObject(remoteInterface: Class<T>): T {
        var obj: Any? = objectMap[remoteInterface]
        if (obj != null) {
            return obj as T
        }
        obj = Proxy.newProxyInstance(javaClass.classLoader, arrayOf<Class<*>>(remoteInterface)
        ) { proxy, method, args ->
            val req = RpcRequest()
            req.interfaceClass = remoteInterface.name
            req.method = method.name
            val types = method.parameterTypes
            if (!types.isEmpty()) {
                val paramClass = arrayOfNulls<String>(types.size)
                for (i in types.indices) {
                    paramClass[i] = types[i].name
                }
                req.setParamClassList(*paramClass)
            }
            req.setParams(*args)

            val rmiResp = sendRpcRequest(req)
            if (!rmiResp.exception.isNullOrBlank()) {
                throw RuntimeException(rmiResp.exception)
            }
            rmiResp.returnObject
        } as T
        objectMap[remoteInterface] = obj!!
        return obj
    }

    @Throws(Exception::class)
    private fun sendRpcRequest(request: RpcRequest): RpcResponse {
        val rpcResponseCompletableFuture = CompletableFuture<RpcResponse>()
        synchRespMap[request.uuid] = rpcResponseCompletableFuture

        //输出消息
        val byteArrayOutputStream = ByteArrayOutputStream()
        val objectOutput = ObjectOutputStream(byteArrayOutputStream)
        objectOutput.writeObject(request)
        val data = byteArrayOutputStream.toByteArray()
        synchronized(aioSession!!) {
            aioSession!!.writeBuffer().writeInt(data.size + 4)
            aioSession!!.writeBuffer().write(data)
            aioSession!!.writeBuffer().flush()
        }
        //        aioSession.write(byteArrayOutputStream.toByteArray());

        try {
            return rpcResponseCompletableFuture.get(3, TimeUnit.SECONDS)
        } catch (e: Exception) {
            throw SocketTimeoutException("Message is timeout!")
        }

    }

    override fun stateEvent(session: AioSession<ByteArray>, stateMachineEnum: StateMachineEnum, throwable: Throwable?) {
        when (stateMachineEnum) {
            StateMachineEnum.NEW_SESSION -> {
                this.aioSession = session
            }
            StateMachineEnum.SESSION_CLOSED ->{
                println("Closed")
            }
        }
    }

    companion object {
        private val LOGGER = LoggerFactory.getLogger(RpcConsumerProcessor::class.java)

        @JvmStatic
        fun main(args: Array<String>) {
            val completableFuture = CompletableFuture<Int>()
            Thread {
                try {
                    println(completableFuture.get())
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                } catch (e: ExecutionException) {
                    e.printStackTrace()
                }
            }.start()

            Thread {
                try {
                    Thread.sleep(2000)
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                }

                completableFuture.complete(null)
            }.start()
        }
    }

}
